{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercise 6 - Handling Slow Tasks\n",
    "\n",
    "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.\n",
    "\n",
    "See the documentation for `ray.wait` at https://ray.readthedocs.io/en/latest/package-ref.html?highlight=ray.wait#ray.wait.\n",
    "\n",
    "### Concepts for this Exercise - ray.wait\n",
    "\n",
    "After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.\n",
    "\n",
    "```python\n",
    "ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)\n",
    "```\n",
    "\n",
    "**Arguments:**\n",
    "- `object_ids`: This is a list of object IDs.\n",
    "- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.\n",
    "- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.\n",
    "\n",
    "**Return values:**\n",
    "- `ready_ids`: This is a list of object IDs that are available in the object store.\n",
    "- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(num_cpus=6, include_webui=False, ignore_reinit_error=True)\n",
    "\n",
    "# Sleep a little to improve the accuracy of the timing measurements used below,\n",
    "# because some workers may still be starting up in the background.\n",
    "time.sleep(2.0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a remote function that takes a variable amount of time to run."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@ray.remote\n",
    "def f(i):\n",
    "    np.random.seed(5 + i)\n",
    "    x = np.random.uniform(0, 4)\n",
    "    time.sleep(x)\n",
    "    return i, time.time()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_time = time.time()\n",
    "\n",
    "# This launches 6 tasks, each of which takes a random amount of time to\n",
    "# complete.\n",
    "result_ids = [f.remote(i) for i in range(6)]\n",
    "# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we\n",
    "# should instead use the first 3 tasks that finish.\n",
    "initial_results = ray.get(result_ids[:3])\n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Wait for the remaining tasks to complete.\n",
    "remaining_results = ray.get(result_ids[3:])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "assert len(initial_results) == 3\n",
    "assert len(remaining_results) == 3\n",
    "\n",
    "initial_indices = [result[0] for result in initial_results]\n",
    "initial_times = [result[1] for result in initial_results]\n",
    "remaining_indices = [result[0] for result in remaining_results]\n",
    "remaining_times = [result[1] for result in remaining_results]\n",
    "\n",
    "assert set(initial_indices + remaining_indices) == set(range(6))\n",
    "\n",
    "assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '\n",
    "                        '{} seconds. This is too slow.'.format(duration))\n",
    "\n",
    "assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '\n",
    "                        '{} seconds. This is too slow.'.format(duration))\n",
    "\n",
    "# Make sure the initial results actually completed first.\n",
    "assert max(initial_times) < min(remaining_times)\n",
    "\n",
    "print('Success! The example took {} seconds.'.format(duration))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
